use crate::extension::index::index::query::index_entry::IndexEntry;
use ordermap::OrderSet;
use std::cell::RefCell;
use std::sync::Arc;

pub struct IndexEntryOperator<IE>
where
    IE: IndexEntry,
{
    index_entry: Arc<RefCell<IE>>,
}

impl<IE: IndexEntry> IndexEntryOperator<IE> {
    pub fn new(index_entry: Arc<RefCell<IE>>) -> Self {
        IndexEntryOperator { index_entry }
    }
}

pub trait NavigableSet<T> {
    fn head_set(&self, element: &T, inclusive: bool) -> OrderSet<T>;
    fn tail_set(&self, element: &T, inclusive: bool) -> OrderSet<T>;
    fn sub_set(
        &self,
        from_element: &T,
        from_inclusive: bool,
        to_element: &T,
        to_inclusive: bool,
    ) -> OrderSet<T>;
}
impl<T> NavigableSet<T> for OrderSet<T>
where
    T: PartialOrd<T> + Clone + std::hash::Hash + Eq,
{
    fn head_set(&self, element: &T, inclusive: bool) -> OrderSet<T> {
        let set = self
            .iter()
            .take_while(|x| {
                if inclusive {
                    **x <= *element
                } else {
                    **x < *element
                }
            })
            .cloned()
            .collect::<OrderSet<T>>();
        set
    }

    fn tail_set(&self, element: &T, inclusive: bool) -> OrderSet<T> {
        let set = self
            .iter()
            .filter(|x| {
                if inclusive {
                    **x >= *element
                } else {
                    **x > *element
                }
            })
            .cloned()
            .collect::<OrderSet<T>>();
        set
    }

    fn sub_set(
        &self,
        from_element: &T,
        from_inclusive: bool,
        to_element: &T,
        to_inclusive: bool,
    ) -> OrderSet<T> {
        let set = self
            .iter()
            .filter(|x| {
                if from_inclusive && to_inclusive {
                    **x >= *from_element && **x <= *to_element
                } else if from_inclusive && !to_inclusive {
                    **x >= *from_element && **x < *to_element
                } else if !from_inclusive && to_inclusive {
                    **x > *from_element && **x <= *to_element
                } else {
                    **x > *from_element && **x < *to_element
                }
            })
            .cloned()
            .collect::<OrderSet<T>>();
        set
    }
}

impl<IE: IndexEntry> IndexEntryOperator<IE> {
    pub fn less_than(&self, key: &String, or_equal: bool) -> OrderSet<String> {
        let indexed_keys = self.index_entry.borrow().indexed_keys();
        let head_set_keys = indexed_keys.head_set(key, or_equal);
        self.find_in(&head_set_keys)
    }

    pub fn greater_then(&self, key: &String, or_equal: bool) -> OrderSet<String> {
        let indexed_keys = self.index_entry.borrow().indexed_keys();
        let tail_set_keys = indexed_keys.tail_set(key, or_equal);
        self.find_in(&tail_set_keys)
    }

    pub fn range(
        &self,
        start: &String,
        end: &String,
        start_inclusive: bool,
        end_inclusive: bool,
    ) -> OrderSet<String> {
        let indexed_keys = self.index_entry.borrow().indexed_keys();
        let sub_keys = indexed_keys.sub_set(start, start_inclusive, end, end_inclusive);
        self.find_in(&sub_keys)
    }

    pub fn find(&self, key: &str) -> OrderSet<String> {
        let indexed_keys = self.index_entry.borrow().get_object_names_by(key);
        OrderSet::from_iter(indexed_keys)
    }

    pub fn find_in(&self, keys: &OrderSet<String>) -> OrderSet<String> {
        let mut set = OrderSet::new();
        self.index_entry
            .borrow()
            .entries()
            .for_each(|(key, value)| {
                if keys.contains(key) {
                    set.insert(value.clone());
                }
            });
        set
    }

    pub fn get_values(&self) -> OrderSet<String> {
        let mut set = OrderSet::new();

        self.index_entry.borrow().entries().for_each(|(_, value)| {
            set.insert(value.clone());
        });
        set
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::config::app_error::AppResult;
    use crate::extension::index::index::query::index_descriptor::IndexDescriptor;
    use crate::tests::index_view_data_set;
    use crate::tests::index_view_data_set::FakeExtension;
    use ordermap::OrderMap;
    use std::collections::HashMap;

    fn create_indexed_map_and_pile() -> (OrderMap<String, Vec<String>>, MockIndexEntry) {
        let entries = vec![
            ("apple".to_string(), "A".to_string()),
            ("banana".to_string(), "B".to_string()),
            ("cherry".to_string(), "C".to_string()),
            ("date".to_string(), "D".to_string()),
            ("egg".to_string(), "E".to_string()),
            ("f".to_string(), "F".to_string()),
        ];

        let indexed_map = index_view_data_set::to_key_object_map(&entries);

        let index_entry = MockIndexEntry::mock(entries, indexed_map.clone());

        (indexed_map, index_entry)
    }

    #[test]
    fn less_than() {
        let (indexed_map, index_entry) = create_indexed_map_and_pile();

        let index_entry_operator = IndexEntryOperator::new(Arc::new(RefCell::new(index_entry)));

        let result = index_entry_operator.less_than(&"banana".to_string(), false);
        assert_eq!(result, OrderSet::<String>::from_iter(["A".to_string()]));

        let result = index_entry_operator.less_than(&"banana".to_string(), true);
        assert_eq!(
            result,
            OrderSet::<String>::from_iter(["A".to_string(), "B".to_string()])
        );

        let result = index_entry_operator.less_than(&"cherry".to_string(), true);
        assert_eq!(
            result,
            OrderSet::<String>::from_iter(["A".to_string(), "B".to_string(), "C".to_string()])
        );

        // does not exist key
        let result = index_entry_operator.less_than(&"z".to_string(), false);
        let object_ids: Vec<String> = indexed_map
            .values()
            .into_iter()
            .flatten()
            .cloned()
            .collect();
        assert_eq!(result, OrderSet::<String>::from_iter(object_ids));

        let result = index_entry_operator.less_than(&"a".to_string(), false);
        assert!(result.is_empty());
    }

    #[test]
    fn greater_then() {
        let (_, index_entry) = create_indexed_map_and_pile();

        let index_entry_operator = IndexEntryOperator::new(Arc::new(RefCell::new(index_entry)));

        let result = index_entry_operator.greater_then(&"banana".to_string(), false);
        assert_eq!(
            result,
            OrderSet::<String>::from_iter([
                "C".to_string(),
                "D".to_string(),
                "E".to_string(),
                "F".to_string()
            ])
        );

        let result = index_entry_operator.greater_then(&"banana".to_string(), true);
        assert_eq!(
            result,
            OrderSet::<String>::from_iter([
                "B".to_string(),
                "C".to_string(),
                "D".to_string(),
                "E".to_string(),
                "F".to_string()
            ])
        );

        let result = index_entry_operator.greater_then(&"cherry".to_string(), true);
        assert_eq!(
            result,
            OrderSet::<String>::from_iter([
                "C".to_string(),
                "D".to_string(),
                "E".to_string(),
                "F".to_string()
            ])
        );

        let result = index_entry_operator.greater_then(&"cherry".to_string(), false);
        assert_eq!(
            result,
            OrderSet::<String>::from_iter(["D".to_string(), "E".to_string(), "F".to_string()])
        );

        let result = index_entry_operator.greater_then(&"z".to_string(), false);
        assert!(result.is_empty());
    }

    #[test]
    fn greater_then_for_number_string() {
        let entries = vec![
            ("100".to_string(), "1".to_string()),
            ("101".to_string(), "2".to_string()),
            ("102".to_string(), "3".to_string()),
            ("103".to_string(), "4".to_string()),
            ("110".to_string(), "5".to_string()),
            ("111".to_string(), "6".to_string()),
            ("112".to_string(), "7".to_string()),
            ("120".to_string(), "8".to_string()),
        ];
        let indexed_map = index_view_data_set::to_key_object_map(&entries);
        let index_entry = MockIndexEntry::mock(entries, indexed_map);
        let index_entry_operator = IndexEntryOperator::new(Arc::new(RefCell::new(index_entry)));
        let result = index_entry_operator.greater_then(&"102".to_string(), false);
        assert_eq!(
            result,
            OrderSet::<String>::from_iter([
                "4".to_string(),
                "5".to_string(),
                "6".to_string(),
                "7".to_string(),
                "8".to_string()
            ])
        );
        let result = index_entry_operator.greater_then(&"110".to_string(), false);
        assert_eq!(
            result,
            OrderSet::<String>::from_iter(["6".to_string(), "7".to_string(), "8".to_string()])
        );
    }

    #[test]
    fn range() {
        let (_, index_entry) = create_indexed_map_and_pile();

        let index_entry_operator = IndexEntryOperator::new(Arc::new(RefCell::new(index_entry)));

        let result =
            index_entry_operator.range(&"banana".to_string(), &"date".to_string(), true, false);
        assert_eq!(
            result,
            OrderSet::<String>::from_iter(["B".to_string(), "C".to_string()])
        );

        let result =
            index_entry_operator.range(&"banana".to_string(), &"date".to_string(), false, false);
        assert_eq!(result, OrderSet::<String>::from_iter(["C".to_string()]));

        let result =
            index_entry_operator.range(&"banana".to_string(), &"date".to_string(), true, true);
        assert_eq!(
            result,
            OrderSet::<String>::from_iter(["B".to_string(), "C".to_string(), "D".to_string()])
        );

        let result =
            index_entry_operator.range(&"apple".to_string(), &"egg".to_string(), false, true);
        assert_eq!(
            result,
            OrderSet::<String>::from_iter([
                "B".to_string(),
                "C".to_string(),
                "D".to_string(),
                "E".to_string()
            ])
        );
        // end not exist
        let result = index_entry_operator.range(&"d".to_string(), &"z".to_string(), false, false);
        assert_eq!(
            result,
            OrderSet::<String>::from_iter(["D".to_string(), "E".to_string(), "F".to_string()])
        );
        // start key > end key
        assert!(index_entry_operator
            .range(&"z".to_string(), &"f".to_string(), false, false)
            .is_empty());
        // both not exist
        assert!(index_entry_operator
            .range(&"z".to_string(), &"zz".to_string(), false, false)
            .is_empty());
    }

    #[test]
    fn find() {
        let (_, index_entry) = create_indexed_map_and_pile();

        let index_entry_operator = IndexEntryOperator::new(Arc::new(RefCell::new(index_entry)));

        let result = index_entry_operator.find("banana");
        assert_eq!(result, OrderSet::<String>::from_iter(["B".to_string()]));

        let result = index_entry_operator.find("date");
        assert_eq!(result, OrderSet::<String>::from_iter(["D".to_string()]));
        let result = index_entry_operator.find(&"z".to_string());
        assert!(result.is_empty());
    }

    #[test]
    fn find_in() {
        let (_, index_entry) = create_indexed_map_and_pile();

        let index_entry_operator = IndexEntryOperator::new(Arc::new(RefCell::new(index_entry)));

        let result = index_entry_operator.find_in(&OrderSet::<String>::from_iter([
            "banana".to_string(),
            "date".to_string(),
        ]));
        assert_eq!(
            result,
            OrderSet::<String>::from_iter(["B".to_string(), "D".to_string()])
        )
    }

    pub struct MockIndexEntry {
        entries: Vec<(String, String)>,
        index_map: OrderMap<String, Vec<String>>,
    }

    impl MockIndexEntry {
        pub(crate) fn mock(
            entries: Vec<(String, String)>,
            index_map: OrderMap<String, Vec<String>>,
        ) -> Self {
            MockIndexEntry { entries, index_map }
        }
    }

    impl IndexEntry for MockIndexEntry {
        type T = FakeExtension;

        fn add_entry(&mut self, index_keys: Vec<String>, object_key: String) -> AppResult<()> {
            todo!()
        }

        fn remove_entry(&mut self, indexed_key: String, object_key: String) {
            todo!()
        }

        fn remove(&mut self, object_key: String) {
            todo!()
        }

        fn get_index_descriptor(&self) -> &IndexDescriptor<Self::T> {
            todo!()
        }

        fn get_inner_index_descriptor(self) -> IndexDescriptor<Self::T> {
            todo!()
        }

        fn indexed_keys(&self) -> OrderSet<String> {
            let vec = self.index_map.keys().cloned().collect();
            vec
        }

        fn entries(&self) -> impl Iterator<Item = (&String, &String)> {
            // self.entries.iter()
            let map = self.entries.iter().map(|(a, b)| (a, b));
            map
        }

        fn get_id_position_map(&self) -> HashMap<String, i32> {
            todo!()
        }

        fn get_object_names_by(&self, index_key: &str) -> Vec<String> {
            self.index_map.get(index_key).unwrap_or(&Vec::new()).clone()
        }

        fn clear(&mut self) {
            todo!()
        }
    }

    #[test]
    fn head_set_inclusive() {
        let set: OrderSet<i32> = [1, 3, 5, 7, 9].iter().cloned().collect();
        let head = set.head_set(&5, true);
        assert_eq!(head, [1, 3, 5].iter().cloned().collect::<OrderSet<i32>>());
    }

    #[test]
    fn head_set_exclusive() {
        let set: OrderSet<i32> = [1, 3, 5, 7, 9].iter().cloned().collect();
        let head = set.head_set(&5, false);
        assert_eq!(head, [1, 3].iter().cloned().collect::<OrderSet<i32>>());
    }

    #[test]
    fn tail_set_inclusive() {
        let set: OrderSet<i32> = [1, 3, 5, 7, 9].iter().cloned().collect();
        let tail = set.tail_set(&5, true);
        assert_eq!(tail, [5, 7, 9].iter().cloned().collect::<OrderSet<i32>>());
    }

    #[test]
    fn tail_set_exclusive() {
        let set: OrderSet<i32> = [1, 3, 5, 7, 9].iter().cloned().collect();
        let tail = set.tail_set(&5, false);
        assert_eq!(tail, [7, 9].iter().cloned().collect::<OrderSet<i32>>());
    }

    #[test]
    fn sub_set_from_inclusive() {
        let set: OrderSet<i32> = [1, 3, 5, 7, 9].iter().cloned().collect();
        let sub = set.sub_set(&5, true, &7, true);
        assert_eq!(sub, [5, 7].iter().cloned().collect::<OrderSet<i32>>());
    }

    #[test]
    fn sub_set_from_exclusive() {
        let set: OrderSet<i32> = [1, 3, 5, 7, 9].iter().cloned().collect();
        let sub = set.sub_set(&5, false, &7, true);
        assert_eq!(sub, [7].iter().cloned().collect::<OrderSet<i32>>());
    }

    #[test]
    fn sub_set_to_exclusive() {
        let set: OrderSet<i32> = [1, 3, 5, 7, 9].iter().cloned().collect();
        let sub = set.sub_set(&5, true, &7, false);
        assert_eq!(sub, [5].iter().cloned().collect::<OrderSet<i32>>());
    }

    #[test]
    fn sub_set_all_exclusive() {
        let set: OrderSet<i32> = [1, 3, 5, 6, 7, 9].iter().cloned().collect();
        let sub = set.sub_set(&5, false, &7, false);
        assert_eq!(sub, [6].iter().cloned().collect::<OrderSet<i32>>());
    }
}
